/**
 * QUANSHI.com Inc.
 * Copyright (c) 2016-2017 All Rights Reserved.
 */
package com.quanshi.scheduler.core.executor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;

import com.quanshi.scheduler.core.biz.ExecutorBiz;
import com.quanshi.scheduler.core.biz.impl.ExecutorBizImpl;
import com.quanshi.scheduler.core.handler.IJobHandler;
import com.quanshi.scheduler.core.handler.annotation.JobHandler;
import com.quanshi.scheduler.core.rpc.netcom.jetty.NetComServerFactory;
import com.quanshi.scheduler.core.thread.ExecutorRegistryThread;
import com.quanshi.scheduler.core.thread.JobThread;
import com.quanshi.scheduler.core.thread.TriggerCallbackThread;
import com.quanshi.scheduler.core.utils.AdminApiUtils;

/**
 * 任务执行器服务
 * 
 * @author chinaxiang
 * @version 2017年7月7日 下午11:16:55
 */
public class IJobExecutor implements ApplicationContextAware, ApplicationListener<ApplicationEvent> {

    private static final Logger logger = LoggerFactory.getLogger(IJobExecutor.class);

    private String              ip;
    private int                 port   = 9999;
    private String              appName;
    private String              adminAddresses;
    public static String        logPath;

    public void setIp(String ip) {
        logger.info( "ip:{}.", ip );
        this.ip = ip;
    }

    public void setPort(int port) {
        logger.info( "port:{}.", port );
        this.port = port;
    }

    public void setAppName(String appName) {
        logger.info( "appName:{}.", appName );
        this.appName = appName;
    }

    public void setAdminAddresses(String adminAddresses) {
        logger.info( "adminAddresses:{}.", adminAddresses );
        this.adminAddresses = adminAddresses;
    }

    public void setLogPath(String logPath) {
        logger.info( "logPath:{}.", logPath );
        IJobExecutor.logPath = logPath;
    }

    // ---------------------------------- job server ------------------------------------
    private NetComServerFactory serverFactory = new NetComServerFactory();

    public void start() throws Exception {
        // admin api util init
        AdminApiUtils.init(adminAddresses);

        // executor start
        NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());
        serverFactory.start(port, ip, appName);

        // trigger callback thread start
        TriggerCallbackThread.getInstance().start();
    }

    public void destroy() {
        // 1、executor registry thread stop
        ExecutorRegistryThread.getInstance().toStop();

        // 2、executor stop
        serverFactory.destroy();

        // 3、job thread repository destory
        if (jobThreadRepository.size() > 0) {
            for (Map.Entry<Long, JobThread> item : jobThreadRepository.entrySet()) {
                JobThread jobThread = item.getValue();
                jobThread.toStop("容器销毁终止");
                jobThread.interrupt();
            }
            jobThreadRepository.clear();
        }

        // 4、trigger callback thread stop
        TriggerCallbackThread.getInstance().toStop();
    }

    // ---------------------------------- init job handler ------------------------------------
    public static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        IJobExecutor.applicationContext = applicationContext;

        // init job handler action
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

        if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                if (serviceBean instanceof IJobHandler) {
                    String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                    IJobHandler handler = (IJobHandler) serviceBean;
                    registJobHandler(name, handler);
                }
            }
        }
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        if (applicationEvent instanceof ContextClosedEvent) {
            logger.info( "application context closed." );
        }
    }

    // ---------------------------------- job handler repository
    private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
        logger.info("job regist job handler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }

    public static IJobHandler loadJobHandler(String name) {
        return jobHandlerRepository.get(name);
    }

    // ---------------------------------- job thread repository
    private static ConcurrentHashMap<Long, JobThread> jobThreadRepository = new ConcurrentHashMap<Long, JobThread>();

    public static JobThread registJobThread(long jobId, IJobHandler handler, String removeOldReason) {
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info("job regist JobThread success, jobId:{}, handler:{}.", jobId, handler);

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }

    public static void removeJobThread(long jobId, String removeOldReason) {
        JobThread oldJobThread = jobThreadRepository.remove(jobId);
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }
    }

    public static JobThread loadJobThread(long jobId) {
        JobThread jobThread = jobThreadRepository.get(jobId);
        return jobThread;
    }
}
